In our solution, we used the Yolov5 model by ultralitycs to detect whether people wear a covid mask or not.
First, we annotate the dataset using the Coco Annotator tool. We decided to annotate most of the face areas and excluded the blurred ones, and to consider two classes "mask", "no-mask".
As specified in the attached hyp.covidmask.yaml file, we used slightly different augmentations from the default ones by ultralitycs, for training a pre-trained on COCO model. We used: image HSV-Hue, image HSV-Saturation, image HSV-Value augmentations, a horizontal flip (left to right), and a rotation. In order to fetch face augmentations, we added the default augmentations a -20/+20 degrees rotation and made sure that the vertical flip is disabled.
For the "dirty training implementation", first, we decided to train the yolov5x pre-trained on COCO model, because the COCO dataset already contains face features, so we could use that advantage to fine-tune the pre-trained yolov5x to detect faces with or without masks. So naturally, we changed the number of classes to fit our problem (2), and this way, we made this model distinguish between two kinds of faces (mask, no-mask).
Unfortunately, the 10 epochs model was the worst, there are a few reasons for this:
We tagged this as an "underfit" event, as the model was actually training (the loss slowly reduced, and the mAP very slowly increased) and it took about only a few minutes, so we decided to NOT change the learning rate (the model was actually learning) and to increase the batch size and the number of epochs to 16, 350 respectively. This training is also "dirty" because we could still reach a better result. It took less time than 350 epochs sounds, due to the small dataset (about 3-4 hours of training). Eventually, we got a nice result with some slight mistakes, demonstrated below.
Eylon Mizrahi Daniel Ivkovich
Albumentations
!pip install albumentations
!pip install -U git+https://github.com/albumentations-team/albumentations
YoloV5
# Nvidia Apex installation to speed up Yolov5 training
!git clone https://github.com/NVIDIA/apex && cd apex && pip install -v --no-cache-dir --global-option="--cpp_ext" --global-option="--cuda_ext" . --user && cd .. && rm -rf apex
# Cloning YoloV5 from GitHub
# !git clone https://github.com/ultralytics/yolov5
Mount drive
from google.colab import drive
drive.mount('/content/gdrive')
Loading the relevant data and models (after training) from drive
!cp "/content/gdrive/My Drive/CV2 course/lab1.zip" "/content"
!unzip "/content/lab1.zip" -d "/content/"
!cp "/content/gdrive/My Drive/CV2 course/yolov5_covidmask.zip" "/content"
!unzip "/content/yolov5_covidmask.zip" -d "/content/"
import sys
import numpy as np
import cv2
import random
import glob
import os
import gc
import shutil as sh
import itertools
import albumentations as A
import json
import torch
import torch.nn.functional as F
import torchvision
import torchvision.models as models
from matplotlib import pyplot as plt
from tqdm.auto import tqdm
from itertools import chain, combinations
from torch.utils.data.sampler import SequentialSampler, RandomSampler
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms as transforms
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator
%matplotlib inline
Determining the device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)
!nvidia-smi
TRAIN_BATCH_SIZE = 8
TEST_BATCH_SIZE = 1
IMG_SIZE = 512
YOLO_PATH_350 = "/content/yolov5/runs/yolov5x_covidmask_350/weights/best.pt"
YOLO_PATH_10 = "/content/yolov5/runs/yolov5x_covidmask_10/weights/last.pt"
JSON_PATH = '/content/lab1/covidmask_ds.json'
DS_PATH = "/content/lab1/covidmask_ds"
DARKNET_DS_PATH = "/content/lab1/yolov5x-covidmask-dataset"
class Im(object):
def __init__(self, path):
self.path = path
self.rgb_image = None
self.bgr_image = None
self.gray_image = None
def read(self, return_image = False):
self.rgb_image = plt.imread(self.path)
if return_image:
return self.rgb_image
def bgr(self, return_image = False):
self.bgr_image = np.flip(plt.imread(self.path), 2)
if return_image:
return self.bgr_image
def gray(self, return_image = False):
self.gray_image = cv2.cvtColor(plt.imread(self.path), cv2.COLOR_RGB2GRAY)
if return_image:
return self.gray_image
@staticmethod
def show(image, title = 'image'):
if len(image.shape) == 3:
plt.imshow(image)
else:
plt.imshow(image, cmap = 'gray')
plt.axis('off')
plt.title(title)
@staticmethod
def show_all(image_list, title_list, n=-1):
assert len(image_list) == len(title_list), "Incompatible lengths of lists!"
assert n <= len(title_list), "Invalid argument n!"
N = len(image_list)
if n != -1:
N = n
plt.figure(figsize=[20, 20])
for i in range(N):
plt.subplot(1, N, i + 1)
Im.show(image_list[i], title_list[i])
plt.show()
def collate_fn(batch):
return tuple(zip(*batch))
class CovidMaskDataset(Dataset):
def __init__(self, root_path, dest_type, transform=None, process="train", k=10, k_pos=0, coco_format=True, json_path=None, return_img_name=False):
# root_path is the root path of the dataset.
# dest_type is the requested type for images in the dataset ('bgr', 'gray', 'rgb').
# transform is the augmentations to pass images through.
# W×™hen process = "train"/"val" and k, k_pos are given --> if divide is TRUE --> determines the "train-validation" division of paths
# according to k-fold cross validation. k_pos is the start position of the validation set in the list of all paths.
# There are k different k_pos within the list of paths. NOTE: if k is equal to None, then there is no dataset division into train/val.
# process = "val" causes get_item() method to return only the current image and its id.
# coco_format and the given csv argument determine together if a format conversion is needed for using the data.
assert root_path is not dest_type, "Paths need to be unique!"
assert dest_type in ['rgb', 'bgr', 'gray'], "Invalid types!"
assert process in ['train', 'val', 'test'], "Invalid process!"
assert k_pos < k, "Incompatible ratio between k and k_pos!"
super(CovidMaskDataset, self).__init__()
self.transform = transform
self.process = process
self.dest_type = dest_type
self.return_img_name = return_img_name
self.coco_format = coco_format
self.paths = glob.glob(f'{root_path}/*')
if k is not None:
if self.process == "train":
before_pos = self.paths[: len(self.paths) * k_pos // k]
after_pos = self.paths[len(self.paths) * (k_pos + 1) // k : len(self.paths)]
self.paths = before_pos + after_pos
elif self.process == "val":
self.paths = self.paths[len(self.paths) * k_pos // k : len(self.paths) * (k_pos + 1) // k]
random.shuffle(self.paths) # Shuffle the paths
self.annotations = self.get_all_targets(json_path)
def get_target_by_name(self, img_id, json_annotations):
# Gets compatible targets by img_id as a key
target = dict()
boxes, labels, area, iscrowd = list(), list(), 0, list()
for anno in json_annotations:
if img_id == anno['image_id']:
boxes.append(anno['bbox'])
labels.append(anno['category_id'] - 1)
area += anno['area']
boxes = torch.tensor(boxes, dtype=torch.float32)
iscrowd = torch.ones(len(boxes), dtype=torch.int64) # suppose all instances are crowded
if len(boxes) > 0 and self.coco_format:
# Converting from Coco to Pascal_voc format
boxes[:, 2] = boxes[:, 0] + boxes[:, 2]
boxes[:, 3] = boxes[:, 1] + boxes[:, 3]
target["boxes"] = boxes
target["labels"] = labels
target["area"] = torch.tensor(area)
target["iscrowd"] = iscrowd
return target
def get_all_targets(self, json_path):
# Given json file path of annotations and images data,
# Returns an organized dictionary that contains the whole annotation
annotations = dict()
with open(json_path) as json_file:
data = json.load(json_file)
imgs_data = data['images']
json_annotations = data['annotations']
for p in self.paths:
img_name = p[p.rfind('/') + 1 : p.rfind('.')]
for img_data in imgs_data:
p_ = img_data['path']
img_name_ = p_[p_.rfind('/') + 1 : p_.rfind('.')]
if img_name == img_name_:
img_id = img_data['id']
annotations[img_name] = self.get_target_by_name(img_id, json_annotations)
return annotations
def __getitem__(self, idx):
p = self.paths[idx]
x = cv2.imread(p)
img_name = p[p.rfind('/') + 1 : p.rfind('.')]
y = self.annotations[img_name]
y["image_id"] = torch.tensor(idx)
if self.transform is not None:
sample = self.transform(**{
'image': x,
'bboxes': y['boxes'],
'labels': y['labels'],
'area': y['area'],
'crowd': y['iscrowd'],
'id': y['image_id']
})
x = sample['image']
y['boxes'] = torch.as_tensor(sample['bboxes'], dtype=torch.float32)
y['area'] = torch.as_tensor(sample['area'], dtype=torch.float32)
y['iscrowd'] = torch.as_tensor(sample['crowd'], dtype=torch.int64)
y['labels'] = torch.as_tensor(sample['labels'], dtype=torch.int64)
y['image_id'] = torch.as_tensor(sample['id'], dtype=torch.int64)
if self.dest_type is 'rgb' or self.dest_type is 'bgr':
if self.dest_type is 'rgb':
x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB)
x = torch.from_numpy(x).permute(2, 0, 1)
elif self.dest_type is 'gray':
x = cv2.cvtColor(x, cv2.COLOR_BGR2GRAY)
x = torch.from_numpy(x)
x = x.float()
if self.return_img_name:
return x, y, img_name
return x, y
def __len__(self):
return len(self.paths)
def draw_bboxes(img, boxes, text=None, text_size=None, thickness=3):
# Draws all the given bounding boxes (boxes) within the given image (img), using a thickness value,
# a text list corresponding with each bounding box, and its size
text_ = text
if text_ is None:
text_ = [""] * len(boxes)
new_img = img.copy()
for b, t in zip(boxes, text_):
start_point = (int(b[0]), int(b[1]))
end_point = (int(b[2]), int(b[3]))
color = tuple()
if t == 'mask':
color = (0, 50, 255)
if t == 'no-mask':
color = (255, 0, 0)
new_img = cv2.rectangle(new_img, start_point, end_point, color, thickness)
if text is not None and text_size is not None:
cv2.putText(new_img, t, (int(b[0]), int(b[1]) - 2), cv2.FONT_HERSHEY_SIMPLEX, text_size, color, 4)
return new_img
Augmentations
resize = A.Compose([
A.Resize(IMG_SIZE, IMG_SIZE, p=1, always_apply=True)],
p=1.0,
bbox_params=A.BboxParams(
format='pascal_voc',
min_area=0,
min_visibility=0,
label_fields=['labels']))
# Augmentations for the creation of the dataset in the darknet format (yolov5)
darknet_augs = A.Compose([
A.Resize(IMG_SIZE, IMG_SIZE, p=1, always_apply=True),
A.Normalize()],
p=1.0,
bbox_params=A.BboxParams(
format='pascal_voc',
min_area=0,
min_visibility=0,
label_fields=['labels']))
# CovidMask train dataloader creation
mask_trainset = CovidMaskDataset(DS_PATH, dest_type='rgb', transform=resize, json_path=JSON_PATH, return_img_name=True)
# CovidMask train dataloader creation
mask_trainloader = DataLoader(mask_trainset, batch_size=TRAIN_BATCH_SIZE, shuffle=True, num_workers=8, collate_fn=collate_fn)
# CovidMask validation dataloader creation
mask_valset = CovidMaskDataset(DS_PATH, dest_type='rgb', transform=resize, json_path=JSON_PATH, process="val", return_img_name=True)
# CovidMask validation dataloader creation
mask_valloader = DataLoader(mask_valset, batch_size=TRAIN_BATCH_SIZE, shuffle=False, num_workers=8, collate_fn=collate_fn)
print("Sizes:\n\n", "len(train)=", len(mask_trainset), "\n", "len(val)=", len(mask_valset))
Train examples
it_train = iter(mask_trainloader)
imgs_train, targets_train, train_names = next(it_train)
imgs_train = [img.permute(1, 2, 0).numpy() for img in imgs_train]
Im.show_all(imgs_train, train_names, n=5)
mask_detected_train = list()
for img, target in zip(imgs_train, targets_train):
boxes = target['boxes'].tolist()
int_lbls = target['labels']
lbls = list()
for int_lbl in int_lbls:
if int_lbl == 1:
lbls.append('mask')
elif int_lbl == 2:
lbls.append('no-mask')
mask_detected_train.append(draw_bboxes(img, boxes, lbls, 0.85, 3))
Im.show_all(mask_detected_train, train_names, n=5)
Validation examples
it_val = iter(mask_valloader)
imgs_val, targets_val, val_names = next(it_val)
imgs_val = [img.permute(1, 2, 0).numpy() for img in imgs_val]
Im.show_all(imgs_val, val_names, n=5)
mask_detected_val = list()
for img, target in zip(imgs_val, targets_val):
boxes = target['boxes'].tolist()
int_lbls = target['labels']
lbls = list()
for int_lbl in int_lbls:
if int_lbl == 1:
lbls.append('mask')
elif int_lbl == 2:
lbls.append('no-mask')
mask_detected_val.append(draw_bboxes(img, boxes, lbls, 0.85, 3))
Im.show_all(mask_detected_val, val_names, n=5)
def add_imgs_by_dataloader_to_darknet_ds(dataloader, dest_path, process):
# Writes data from a given dataloader into a given path according to the darknet-yolov5 format
i = 1
for x, y, ids in tqdm(dataloader):
for img, lbl, img_str_id in zip(x, y, ids):
if len(lbl['boxes']) > 0:
image_path = f'{dest_path}/images/{process}/{img_str_id}.png'
label_path = f'{dest_path}/labels/{process}/{img_str_id}.txt'
# Save image
img_np = img.permute(1, 2, 0).numpy().astype(np.uint8)
plt.imsave(image_path, img_np)
with open(label_path, 'w') as label_file:
for b, l in zip(lbl['boxes'], lbl['labels']):
# Normalizing data to the darknet format
x1, x2 = b[0].item() / img_np.shape[0], b[2].item() / img_np.shape[0]
y1, y2 = b[1].item() / img_np.shape[1], b[3].item() / img_np.shape[1]
w, h = (x2 - x1), (y2 - y1)
x_center, y_center = (x1 + w / 2), (y1 + h / 2)
label_file.write(f'{l} {x_center} {y_center} {w} {h}\n')
print(f'{process}-{i} Added {img_str_id}')
i += 1
def create_darknet_ds(src_path, dest_path, dest_type="rgb", transforms=resize, json_path=JSON_PATH):
# Creates a dataset in the given dest_path using the images of an existing dataset in src_path,
# according to the darknet-yolov5 format of dataset.
# The given process name defines the name of the new directory ("train" / "val").
# The images in the new dataset are converted to dest_type (gray, bgr, rgb) if needed and
# passed through augmentations according to the given transforms.
# Train dataset
train_ds = CovidMaskDataset(src_path, dest_type, transform=transforms, json_path=json_path, return_img_name=True, process="train")
train_dl = DataLoader(train_ds, batch_size=TEST_BATCH_SIZE, shuffle=False, num_workers=8, collate_fn=collate_fn)
# Validation dataset
val_ds = CovidMaskDataset(src_path, dest_type, transform=transforms, json_path=json_path, return_img_name=True, process="val")
val_dl = DataLoader(val_ds, batch_size=TEST_BATCH_SIZE, shuffle=False, num_workers=8, collate_fn=collate_fn)
# Adds images to new dataset in dest_path according to the darknet format
add_imgs_by_dataloader_to_darknet_ds(train_dl, dest_path, "train")
add_imgs_by_dataloader_to_darknet_ds(val_dl, dest_path, "val")
# Darknet-Yolo Dataset
create_darknet_ds(DS_PATH, DARKNET_DS_PATH)
# Saving the dataset in the darknet format, splitted to train-validation
# !zip -r ./yolov5x-covidmask-dataset.zip ./yolov5x-covidmask-dataset
%cd yolov5
!pwd
Yolov5 requirements and imports
!pip install -r requirements.txt
from utils.datasets import letterbox
from utils.general import non_max_suppression, scale_coords, plot_results
Yolov5 10 epochs with batch size equals to 8 (4 minutes of training)
!python train.py --img 512 --batch 8 --epochs 10 --data ./data/yolov5x-covidmask-dataset.yaml --cfg ./models/yolov5x_covidmask.yaml --weights yolov5x.pt --hyp ./data/hyp.covidmask.yaml --name yolov5x_covidmask
This graph tests the quality of the model on both train and validation datasets, as a function of training epoch
plot_results(save_dir='./runs/yolov5x_covidmask_10')
Yolov5 350 epochs with batch size equals to 16 (3-4 hours of training)
!python train.py --img 512 --batch 16 --epochs 350 --data ./data/yolov5x-covidmask-dataset.yaml --cfg ./models/yolov5x_covidmask.yaml --weights '' --hyp ./data/hyp_vgg16.covidmask.yaml --name yolov5x_covidmask
This graph tests the quality of the model on both train and validation datasets, as a function of training epoch
plot_results(save_dir='./runs/yolov5x_covidmask_350')
def copy_images(src_path, dest_path, img_types="rgb"):
# Copies images from src_path to dest_path
# and converts the images to the given img_type if needed
paths = glob.glob(src_path + "/*.*")
for p in paths:
img = cv2.imread(p)
if img_types == "rgb":
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
elif img_types == "gray":
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
img_id = p[p.rfind('/') + 1 : p.rfind('.')]
plt.imsave(f'{dest_path}/{img_id}.jpg', img)
# CovidMask validation dataloader creation
mask_valset = CovidMaskDataset(DS_PATH, dest_type='rgb', transform=resize, json_path=JSON_PATH, process="val", return_img_name=False)
# CovidMask validation dataloader creation
mask_valloader = DataLoader(mask_valset, batch_size=TRAIN_BATCH_SIZE, shuffle=False, num_workers=8, collate_fn=collate_fn)
def iou(box1, box2):
# Calculates IoU of the given two tensor boxes
box1 = list(map(lambda val: int(val.item()), box1))
box2 = list(map(lambda val: int(val.item()), box2))
x1min, y1min, x1max, y1max = box1
x2min, y2min, x2max, y2max = box2
if max(x1min, x2min) > min(x1max, x2max) or max(y1min, y2min) > min(y1max, y2max): # There is no intersection
return 0
intersection = (min(x1max, x2max) - max(x1min, x2min)) * (min(y1max, y2max) - max(y1min, y2min))
union = (x1max - x1min) * (y1max - y1min) + (x2max - x2min) * (y2max - y2min) - intersection # area1 + area2 - intersection
return intersection / (union + 1e-9)
def detect1Image(im0, imgsz, model, device, conf_thres, iou_thres):
# Given an input image (im0), its size (imgsz), a model, a device, a score conf_thresh and an iou_thresh
# Returns the detection prediction of the model on the input image, filtered by both of the thresholds, using the device
im0 = im0.permute(1, 2, 0).numpy().astype(np.uint8)
img = letterbox(im0, new_shape=imgsz)[0]
# Convert
img = img[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB, to 3x416x416
img = np.ascontiguousarray(img)
img = torch.from_numpy(img).to(device)
img = img.float() # uint8 to fp16/32
img /= 255.0
if img.ndimension() == 3:
img = img.unsqueeze(0)
# Inference
pred = model(img, augment=False)[0]
# Apply NMS
pred = non_max_suppression(pred, conf_thres, iou_thres)
boxes = []
scores = []
for i, det in enumerate(pred): # detections per image
if det is not None and len(det):
# Rescale boxes from img_size to im0 size
det[:, :4] = scale_coords(img.shape[2:], det[:, :4], im0.shape).round()
# Write results
for *xyxy, conf, cls in det:
boxes.append([int(xyxy[0]), int(xyxy[1]), int(xyxy[2]), int(xyxy[3])])
scores.append(conf)
return np.array(boxes), np.array(scores)
def evaluate_yolo(valloader, model, iou_thresh=0.45, score_thresh=0.45, weights=(1, 1), device="cpu"):
# Calculates f-measure of a given YOLOv5 model on a given validation dataloader, using the given thresholds.
# weights is a tuple of weights for both precision and recall.
# device --> the device to use for calculating the results.
true_positives, false_positives, false_negatives = 0, 0, 0
for images, targets in tqdm(valloader):
for img, t in zip(images, targets):
boxes, scores = detect1Image(img, img.shape[1], model, device, score_thresh, iou_thresh)
scores = [s.cpu().item() for s in scores]
if len(t['boxes']) == 0 and len(boxes) == 0: # No bounding boxes in the image --> the model was right
continue
true_preds = set() # Correct predictions (booxes) by the model
true_lbls = set() # Labels that predicted correctly by the model
lbls_num = 0
preds_num = 0
lbls_num += len(t['boxes'])
preds_num += len(boxes)
for bp in boxes:
for bl in t['boxes']:
iou_ = iou(bl, bp)
# If the value of iou is greater than the given iou_thresh,
# and if the current label yet doesn't match to any other prediction
if (iou_ > iou_thresh) and not (bl in true_lbls):
true_preds.add(torch.from_numpy(bp))
true_lbls.add(bl)
break # Found a possible iou for the current prediction bp --> match bp and bl
true_positives += len(true_lbls)
false_positives += preds_num - len(true_preds)
false_negatives += lbls_num - len(true_preds)
precision = true_positives / (1e-9 + false_positives + true_positives)
recall = true_positives / (1e-9 + false_negatives + true_positives)
f_measure = (weights[0] * precision + weights[1] * precision) / (weights[0] + weights[1])
return f_measure, true_positives, false_positives, false_negatives
Yolov5 10 epochs with batch size equals to 8 (4 minutes of training)
# Loading YOLOv5
yolov5_10 = torch.load(YOLO_PATH_10, map_location=device)['model'].float()
yolov5_10.to(device).eval();
f_measure, tp, fp, fn = evaluate_yolo(mask_valloader, yolov5_10, device=device)
print(f'Model name: detectron2_1f --> F-Measure = {f_measure}\nTP: {tp}\tFP: {fp}\tFN: {fn}')
Showing the images and all of the wrong detections
# If necessary, this function copies the validation images to the inference path to arrange the images for detection
# copy_images(src_path="/content/lab1/yolov5x-covidmask-dataset/images/val", dest_path="/content/yolov5/inference/images")
# If necessary, remove the inferences to clear the directories before detecting with another model
# !rm -rvf ./inference/output
!python detect.py --weights ./runs/yolov5x_covidmask_10/weights/last.pt --img 512 --conf 0.2 --source ./inference/images --save-txt
predicted_paths = glob.glob("./inference/output/*.jpg")
predicted_images, ids = list(), list()
for p in predicted_paths:
predicted_images.append(plt.imread(p))
ids.append(str(p[p.rfind('/') + 1 : p.rfind('.')]))
Im.show_all(predicted_images[:5], ids[:5])
Yolov5 350 epochs with batch size equals to 16 (3-4 hours of training)
# Loading YOLOv5
yolov5_350 = torch.load(YOLO_PATH_350, map_location=device)['model'].float()
yolov5_350.to(device).eval();
f_measure, tp, fp, fn = evaluate_yolo(mask_valloader, yolov5_350, device=device)
print(f'Yolov5 with 350 epochs --> F-Measure = {f_measure}\nTP: {tp}\tFP: {fp}\tFN: {fn}')
# If necessary, this function copies the validation images to the inference path to arrange the images for detection
# copy_images(src_path="/content/lab1/yolov5x-covidmask-dataset/images/val", dest_path="/content/yolov5/inference/images")
# If necessary, remove the inferences to clear the directories before detecting with another model
# !rm -rvf ./inference/output
!python detect.py --weights ./runs/yolov5x_covidmask_350/weights/best.pt --img 512 --conf 0.4 --source ./inference/images --save-txt
predicted_paths = glob.glob("./inference/output/*.jpg")
predicted_images, ids = list(), list()
for p in predicted_paths:
predicted_images.append(plt.imread(p))
ids.append(str(p[p.rfind('/') + 1 : p.rfind('.')]))
Im.show_all(predicted_images[:5], ids[:5])
Im.show_all(predicted_images[5:10], ids[5:10])
Im.show_all(predicted_images[10:15], ids[10:15])
Im.show_all(predicted_images[15:20], ids[15:20])
Saving the models
!zip -r "/content/yolov5_covidmask.zip" "/content/yolov5"
!cp "/content/yolov5_covidmask.zip" "/content/gdrive/My Drive/CV2 course/"